// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Futures provides the futures generated by [`Operator`]
//!
//! By using futures, users can add more options for operation.

use std::collections::HashMap;
use std::future::IntoFuture;
use std::ops::RangeBounds;
use std::time::Duration;

use crate::raw::*;
use crate::*;
use futures::Future;

/// OperatorFuture is the future generated by [`Operator`].
///
/// The future will consume all the input to generate a future.
///
/// # NOTES
///
/// This struct is by design to keep in crate. We don't want
/// users to use this struct directly.
pub struct OperatorFuture<I, O, F: Future<Output = Result<O>>> {
    /// The accessor to the underlying object storage
    acc: Accessor,
    /// The path of string
    path: String,
    /// The input args
    args: I,
    /// The function which will move all the args and return a static future
    f: fn(Accessor, String, I) -> F,
}

impl<I, O, F: Future<Output = Result<O>>> OperatorFuture<I, O, F> {
    /// # NOTES
    ///
    /// This struct is by design to keep in crate. We don't want
    /// users to use this struct directly.
    pub(crate) fn new(
        inner: Accessor,
        path: String,
        args: I,
        f: fn(Accessor, String, I) -> F,
    ) -> Self {
        OperatorFuture {
            acc: inner,
            path,
            args,
            f,
        }
    }
}

impl<I, O, F> IntoFuture for OperatorFuture<I, O, F>
where
    F: Future<Output = Result<O>>,
{
    type Output = Result<O>;
    type IntoFuture = F;

    fn into_future(self) -> Self::IntoFuture {
        (self.f)(self.acc, self.path, self.args)
    }
}

/// Future that generated by [`Operator::stat_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureStat<F> = OperatorFuture<options::StatOptions, Metadata, F>;

impl<F: Future<Output = Result<Metadata>>> FutureStat<F> {
    /// Set the If-Match for this operation.
    ///
    /// Refer to [`options::StatOptions::if_match`] for more details.
    pub fn if_match(mut self, v: &str) -> Self {
        self.args.if_match = Some(v.to_string());
        self
    }

    /// Set the If-None-Match for this operation.
    ///
    /// Refer to [`options::StatOptions::if_none_match`] for more details.
    pub fn if_none_match(mut self, v: &str) -> Self {
        self.args.if_none_match = Some(v.to_string());
        self
    }

    /// Set the If-Modified-Since for this operation.
    ///
    /// Refer to [`options::StatOptions::if_modified_since`] for more details.
    pub fn if_modified_since(mut self, v: Timestamp) -> Self {
        self.args.if_modified_since = Some(v);
        self
    }

    /// Set the If-Unmodified-Since for this operation.
    ///
    /// Refer to [`options::StatOptions::if_unmodified_since`] for more details.
    pub fn if_unmodified_since(mut self, v: Timestamp) -> Self {
        self.args.if_unmodified_since = Some(v);
        self
    }

    /// Set the version for this operation.
    ///
    /// Refer to [`options::StatOptions::version`] for more details.
    pub fn version(mut self, v: &str) -> Self {
        self.args.version = Some(v.to_string());
        self
    }
}

/// Future that generated by [`Operator::presign_stat_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FuturePresignStat<F> =
    OperatorFuture<(options::StatOptions, Duration), PresignedRequest, F>;

impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignStat<F> {
    /// Refer to [`options::StatOptions::override_content_disposition`] for more details.
    pub fn override_content_disposition(mut self, v: &str) -> Self {
        self.args.0.override_content_disposition = Some(v.to_string());
        self
    }

    /// Refer to [`options::StatOptions::override_cache_control`] for more details.
    pub fn override_cache_control(mut self, v: &str) -> Self {
        self.args.0.override_cache_control = Some(v.to_string());
        self
    }

    /// Refer to [`options::StatOptions::override_content_type`] for more details.
    pub fn override_content_type(mut self, v: &str) -> Self {
        self.args.0.override_content_type = Some(v.to_string());
        self
    }

    /// Refer to [`options::StatOptions::if_match`] for more details.
    pub fn if_match(mut self, v: &str) -> Self {
        self.args.0.if_match = Some(v.to_string());
        self
    }

    /// Refer to [`options::StatOptions::if_none_match`] for more details.
    pub fn if_none_match(mut self, v: &str) -> Self {
        self.args.0.if_none_match = Some(v.to_string());
        self
    }
}

/// Future that generated by [`Operator::presign_delete_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FuturePresignDelete<F> =
    OperatorFuture<(options::DeleteOptions, Duration), PresignedRequest, F>;

impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignDelete<F> {}

/// Future that generated by [`Operator::presign_read_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FuturePresignRead<F> =
    OperatorFuture<(options::ReadOptions, Duration), PresignedRequest, F>;

impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignRead<F> {
    /// Refer to [`options::ReadOptions::override_content_disposition`] for more details.
    pub fn override_content_disposition(mut self, v: &str) -> Self {
        self.args.0.override_content_disposition = Some(v.to_string());
        self
    }

    /// Refer to [`options::ReadOptions::override_cache_control`] for more details.
    pub fn override_cache_control(mut self, v: &str) -> Self {
        self.args.0.override_cache_control = Some(v.to_string());
        self
    }

    /// Refer to [`options::ReadOptions::override_content_type`] for more details.
    pub fn override_content_type(mut self, v: &str) -> Self {
        self.args.0.override_content_type = Some(v.to_string());
        self
    }

    /// Refer to [`options::ReadOptions::if_match`] for more details.
    pub fn if_match(mut self, v: &str) -> Self {
        self.args.0.if_match = Some(v.to_string());
        self
    }

    /// Refer to [`options::ReadOptions::if_none_match`] for more details.
    pub fn if_none_match(mut self, v: &str) -> Self {
        self.args.0.if_none_match = Some(v.to_string());
        self
    }
}

/// Future that generated by [`Operator::presign_write_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FuturePresignWrite<F> =
    OperatorFuture<(options::WriteOptions, Duration), PresignedRequest, F>;

impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignWrite<F> {
    /// Refer to [`options::WriteOptions::content_type`] for more details.
    pub fn content_type(mut self, v: &str) -> Self {
        self.args.0.content_type = Some(v.to_string());
        self
    }

    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
    pub fn content_disposition(mut self, v: &str) -> Self {
        self.args.0.content_disposition = Some(v.to_string());
        self
    }

    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
    pub fn content_encoding(mut self, v: &str) -> Self {
        self.args.0.content_encoding = Some(v.to_string());
        self
    }

    /// Refer to [`options::WriteOptions::cache_control`] for more details.
    pub fn cache_control(mut self, v: &str) -> Self {
        self.args.0.cache_control = Some(v.to_string());
        self
    }
}

/// Future that generated by [`Operator::read_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureRead<F> = OperatorFuture<options::ReadOptions, Buffer, F>;

impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
    /// Set `range` for this `read` request.
    ///
    /// If we have a file with size `n`.
    ///
    /// - `..` means read bytes in range `[0, n)` of file.
    /// - `0..1024` and `..1024` means read bytes in range `[0, 1024)` of file
    /// - `1024..` means read bytes in range `[1024, n)` of file
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::TryStreamExt;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let bs = op.read_with("path/to/file").range(0..1024).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
        self.args.range = range.into();
        self
    }

    /// Set `concurrent` for the reader.
    ///
    /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
    /// read large chunks of data. By setting `concurrent`, opendal will read files concurrently
    /// on support storage services.
    ///
    /// By setting `concurrent`, opendal will fetch chunks concurrently with
    /// the given chunk size.
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let r = op.read_with("path/to/file").concurrent(8).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn concurrent(mut self, concurrent: usize) -> Self {
        self.args.concurrent = concurrent.max(1);
        self
    }

    /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
    ///
    /// This following example will make opendal read data in 4MiB chunks:
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn chunk(mut self, chunk_size: usize) -> Self {
        self.args.chunk = Some(chunk_size);
        self
    }

    /// Set `version` for this `read` request.
    ///
    /// This feature can be used to retrieve the data of a specified version of the given path.
    ///
    /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    ///
    /// # async fn test(op: Operator, version: &str) -> Result<()> {
    /// let mut bs = op.read_with("path/to/file").version(version).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn version(mut self, v: &str) -> Self {
        self.args.version = Some(v.to_string());
        self
    }

    /// Set `if_match` for this `read` request.
    ///
    /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
    ///
    /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
    /// will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
    /// let mut metadata = op.read_with("path/to/file").if_match(etag).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_match(mut self, v: &str) -> Self {
        self.args.if_match = Some(v.to_string());
        self
    }

    /// Set `if_none_match` for this `read` request.
    ///
    /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
    ///
    /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
    /// will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
    /// let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_none_match(mut self, v: &str) -> Self {
        self.args.if_none_match = Some(v.to_string());
        self
    }

    /// ## `if_modified_since`
    ///
    /// Set `if_modified_since` for this `read` request.
    ///
    /// This feature can be used to check if the file has been modified since the given timestamp.
    ///
    /// If file exists and it hasn't been modified since the specified time, an error with kind
    /// [`ErrorKind::ConditionNotMatch`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use jiff::Timestamp;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, time: Timestamp) -> Result<()> {
    /// let mut metadata = op.read_with("path/to/file").if_modified_since(time).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_modified_since(mut self, v: impl Into<Timestamp>) -> Self {
        self.args.if_modified_since = Some(v.into());
        self
    }

    /// Set `if_unmodified_since` for this `read` request.
    ///
    /// This feature can be used to check if the file hasn't been modified since the given timestamp.
    ///
    /// If file exists and it has been modified since the specified time, an error with kind
    /// [`ErrorKind::ConditionNotMatch`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use jiff::Timestamp;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, time: Timestamp) -> Result<()> {
    /// let mut metadata = op
    ///     .read_with("path/to/file")
    ///     .if_unmodified_since(time)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_unmodified_since(mut self, v: impl Into<Timestamp>) -> Self {
        self.args.if_unmodified_since = Some(v.into());
        self
    }
}

/// Future that generated by [`Operator::read_with`] or [`Operator::reader_with`].
///
/// Users can add more options by public functions provided by this struct.
///
/// # Notes
///
/// `(OpRead, ())` is a trick to make sure `FutureReader` is different from `FutureRead`
pub type FutureReader<F> = OperatorFuture<options::ReaderOptions, Reader, F>;

impl<F: Future<Output = Result<Reader>>> FutureReader<F> {
    /// Set `version` for this `reader`.
    ///
    /// This feature can be used to retrieve the data of a specified version of the given path.
    ///
    /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    ///
    /// # async fn test(op: Operator, version: &str) -> Result<()> {
    /// let mut r = op.reader_with("path/to/file").version(version).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn version(mut self, v: &str) -> Self {
        self.args.version = Some(v.to_string());
        self
    }

    /// Set `concurrent` for the reader.
    ///
    /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
    /// read large chunks of data. By setting `concurrent`, opendal will reading files concurrently
    /// on support storage services.
    ///
    /// By setting `concurrent`, opendal will fetch chunks concurrently with
    /// the give chunk size.
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let r = op.reader_with("path/to/file").concurrent(8).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn concurrent(mut self, concurrent: usize) -> Self {
        self.args.concurrent = concurrent.max(1);
        self
    }

    /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
    ///
    /// This following example will make opendal read data in 4MiB chunks:
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let r = op
    ///     .reader_with("path/to/file")
    ///     .chunk(4 * 1024 * 1024)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn chunk(mut self, chunk_size: usize) -> Self {
        self.args.chunk = Some(chunk_size);
        self
    }

    /// Controls the optimization strategy for range reads in [`Reader::fetch`].
    ///
    /// When performing range reads, if the gap between two requested ranges is smaller than
    /// the configured `gap` size, OpenDAL will merge these ranges into a single read request
    /// and discard the unrequested data in between. This helps reduce the number of API calls
    /// to remote storage services.
    ///
    /// This optimization is particularly useful when performing multiple small range reads
    /// that are close to each other, as it reduces the overhead of multiple network requests
    /// at the cost of transferring some additional data.
    ///
    /// In this example, if two requested ranges are separated by less than 1MiB,
    /// they will be merged into a single read request:
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # async fn test(op: Operator) -> Result<()> {
    /// let r = op
    ///     .reader_with("path/to/file")
    ///     .chunk(4 * 1024 * 1024)
    ///     .gap(1024 * 1024) // 1MiB gap
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn gap(mut self, gap_size: usize) -> Self {
        self.args.gap = Some(gap_size);
        self
    }

    /// Set `if-match` for this `read` request.
    ///
    /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
    ///
    /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
    /// will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
    /// let mut r = op.reader_with("path/to/file").if_match(etag).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_match(mut self, etag: &str) -> Self {
        self.args.if_match = Some(etag.to_string());
        self
    }

    /// Set `if-none-match` for this `read` request.
    ///
    /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
    ///
    /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
    /// will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
    /// let mut r = op.reader_with("path/to/file").if_none_match(etag).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_none_match(mut self, etag: &str) -> Self {
        self.args.if_none_match = Some(etag.to_string());
        self
    }

    /// Set `if-modified-since` for this `read` request.
    ///
    /// This feature can be used to check if the file has been modified since the given timestamp.
    ///
    /// If file exists and it hasn't been modified since the specified time, an error with kind
    /// [`ErrorKind::ConditionNotMatch`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use jiff::Timestamp;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, time: Timestamp) -> Result<()> {
    /// let mut r = op
    ///     .reader_with("path/to/file")
    ///     .if_modified_since(time)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_modified_since(mut self, v: impl Into<Timestamp>) -> Self {
        self.args.if_modified_since = Some(v.into());
        self
    }

    /// Set `if-unmodified-since` for this `read` request.
    ///
    /// This feature can be used to check if the file hasn't been modified since the given timestamp.
    ///
    /// If file exists and it has been modified since the specified time, an error with kind
    /// [`ErrorKind::ConditionNotMatch`] will be returned.
    ///
    /// ```
    /// # use opendal::Result;
    /// use jiff::Timestamp;
    /// use opendal::Operator;
    /// # async fn test(op: Operator, time: Timestamp) -> Result<()> {
    /// let mut r = op
    ///     .reader_with("path/to/file")
    ///     .if_unmodified_since(time)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_unmodified_since(mut self, v: impl Into<Timestamp>) -> Self {
        self.args.if_unmodified_since = Some(v.into());
        self
    }
}

/// Future that generated by [`Operator::write_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureWrite<F> = OperatorFuture<(options::WriteOptions, Buffer), Metadata, F>;

impl<F: Future<Output = Result<Metadata>>> FutureWrite<F> {
    /// Sets append mode for this write request.
    ///
    /// Refer to [`options::WriteOptions::append`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .append(true)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn append(mut self, v: bool) -> Self {
        self.args.0.append = v;
        self
    }

    /// Sets chunk size for buffered writes.
    ///
    /// Refer to [`options::WriteOptions::chunk`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Set 8MiB chunk size - data will be sent in one API call at close
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .chunk(8 * 1024 * 1024)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn chunk(mut self, v: usize) -> Self {
        self.args.0.chunk = Some(v);
        self
    }

    /// Sets concurrent write operations for this writer.
    ///
    /// Refer to [`options::WriteOptions::concurrent`] for more details.
    ///
    /// ## Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Enable concurrent writes with 8 parallel operations at 128B chunk.
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .chunk(128)
    ///     .concurrent(8)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn concurrent(mut self, v: usize) -> Self {
        self.args.0.concurrent = v.max(1);
        self
    }

    /// Sets Cache-Control header for this write operation.
    ///
    /// Refer to [`options::WriteOptions::cache_control`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Cache content for 7 days (604800 seconds)
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .cache_control("max-age=604800")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn cache_control(mut self, v: &str) -> Self {
        self.args.0.cache_control = Some(v.to_string());
        self
    }

    /// Sets `Content-Type` header for this write operation.
    ///
    /// Refer to [`options::WriteOptions::content_type`] for more details.
    ///
    /// ## Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Set content type for plain text file
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .content_type("text/plain")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_type(mut self, v: &str) -> Self {
        self.args.0.content_type = Some(v.to_string());
        self
    }

    /// Sets Content-Disposition header for this write request.
    ///
    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .content_disposition("attachment; filename=\"filename.jpg\"")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_disposition(mut self, v: &str) -> Self {
        self.args.0.content_disposition = Some(v.to_string());
        self
    }

    /// Sets Content-Encoding header for this write request.
    ///
    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .content_encoding("gzip")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_encoding(mut self, v: &str) -> Self {
        self.args.0.content_encoding = Some(v.to_string());
        self
    }

    /// Sets If-Match header for this write request.
    ///
    /// Refer to [`options::WriteOptions::if_match`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .if_match("\"686897696a7c876b7e\"")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_match(mut self, s: &str) -> Self {
        self.args.0.if_match = Some(s.to_string());
        self
    }

    /// Sets If-None-Match header for this write request.
    ///
    /// Refer to [`options::WriteOptions::if_none_match`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .if_none_match("\"686897696a7c876b7e\"")
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_none_match(mut self, s: &str) -> Self {
        self.args.0.if_none_match = Some(s.to_string());
        self
    }

    /// Sets the condition that write operation will succeed only if target does not exist.
    ///
    /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .if_not_exists(true)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_not_exists(mut self, b: bool) -> Self {
        self.args.0.if_not_exists = b;
        self
    }

    /// Sets user metadata for this write request.
    ///
    /// Refer to [`options::WriteOptions::user_metadata`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .write_with("path/to/file", vec![0; 4096])
    ///     .user_metadata([
    ///         ("language".to_string(), "rust".to_string()),
    ///         ("author".to_string(), "OpenDAL".to_string()),
    ///     ])
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
        self.args.0.user_metadata = Some(HashMap::from_iter(data));
        self
    }
}

/// Future that generated by [`Operator::writer_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureWriter<F> = OperatorFuture<options::WriteOptions, Writer, F>;

impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
    /// Sets append mode for this write request.
    ///
    /// Refer to [`options::WriteOptions::append`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op.writer_with("path/to/file").append(true).await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn append(mut self, v: bool) -> Self {
        self.args.append = v;
        self
    }

    /// Sets chunk size for buffered writes.
    ///
    /// Refer to [`options::WriteOptions::chunk`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Set 8MiB chunk size - data will be sent in one API call at close
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .chunk(8 * 1024 * 1024)
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn chunk(mut self, v: usize) -> Self {
        self.args.chunk = Some(v);
        self
    }

    /// Sets concurrent write operations for this writer.
    ///
    /// Refer to [`options::WriteOptions::concurrent`] for more details.
    ///
    /// ## Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Enable concurrent writes with 8 parallel operations
    /// let mut w = op.writer_with("path/to/file").concurrent(8).await?;
    ///
    /// // First write starts immediately
    /// w.write(vec![0; 4096]).await?;
    ///
    /// // Second write runs concurrently with first
    /// w.write(vec![1; 4096]).await?;
    ///
    /// // Ensures all writes complete successfully and in order
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn concurrent(mut self, v: usize) -> Self {
        self.args.concurrent = v.max(1);
        self
    }

    /// Sets Cache-Control header for this write operation.
    ///
    /// Refer to [`options::WriteOptions::cache_control`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Cache content for 7 days (604800 seconds)
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .cache_control("max-age=604800")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn cache_control(mut self, v: &str) -> Self {
        self.args.cache_control = Some(v.to_string());
        self
    }

    /// Sets `Content-Type` header for this write operation.
    ///
    /// Refer to [`options::WriteOptions::content_type`] for more details.
    ///
    /// ## Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// // Set content type for plain text file
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .content_type("text/plain")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_type(mut self, v: &str) -> Self {
        self.args.content_type = Some(v.to_string());
        self
    }

    /// Sets Content-Disposition header for this write request.
    ///
    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .content_disposition("attachment; filename=\"filename.jpg\"")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_disposition(mut self, v: &str) -> Self {
        self.args.content_disposition = Some(v.to_string());
        self
    }

    /// Sets Content-Encoding header for this write request.
    ///
    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .content_encoding("gzip")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn content_encoding(mut self, v: &str) -> Self {
        self.args.content_encoding = Some(v.to_string());
        self
    }

    /// Sets If-Match header for this write request.
    ///
    /// Refer to [`options::WriteOptions::if_match`] for more details.
    ///
    /// ### Behavior
    ///
    /// - If supported, the write operation will only succeed if the target's ETag matches the specified value
    /// - The value should be a valid ETag string
    /// - Common values include:
    ///   - A specific ETag value like `"686897696a7c876b7e"`
    ///   - `*` - Matches any existing resource
    /// - If not supported, the value will be ignored
    ///
    /// This operation provides conditional write functionality based on ETag matching,
    /// helping prevent unintended overwrites in concurrent scenarios.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .if_match("\"686897696a7c876b7e\"")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_match(mut self, s: &str) -> Self {
        self.args.if_match = Some(s.to_string());
        self
    }

    /// Sets If-None-Match header for this write request.
    ///
    /// Refer to [`options::WriteOptions::if_none_match`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .if_none_match("\"686897696a7c876b7e\"")
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_none_match(mut self, s: &str) -> Self {
        self.args.if_none_match = Some(s.to_string());
        self
    }

    /// Sets the condition that write operation will succeed only if target does not exist.
    ///
    /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op.writer_with("path/to/file").if_not_exists(true).await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.write(vec![1; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_not_exists(mut self, b: bool) -> Self {
        self.args.if_not_exists = b;
        self
    }

    /// Sets user metadata for this write request.
    ///
    /// Refer to [`options::WriteOptions::user_metadata`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    /// # use futures::StreamExt;
    /// # use futures::SinkExt;
    /// use bytes::Bytes;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let mut w = op
    ///     .writer_with("path/to/file")
    ///     .user_metadata([
    ///         ("content-type".to_string(), "text/plain".to_string()),
    ///         ("author".to_string(), "OpenDAL".to_string()),
    ///     ])
    ///     .await?;
    /// w.write(vec![0; 4096]).await?;
    /// w.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
        self.args.user_metadata = Some(HashMap::from_iter(data));
        self
    }
}

/// Future that generated by [`Operator::delete_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureDelete<F> = OperatorFuture<options::DeleteOptions, (), F>;

impl<F: Future<Output = Result<()>>> FutureDelete<F> {
    /// Change the version of this delete operation.
    pub fn version(mut self, v: &str) -> Self {
        self.args.version = Some(v.to_string());
        self
    }
}

/// Future that generated by [`Operator::deleter_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureDeleter<F> = OperatorFuture<OpDeleter, (), F>;

/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureList<F> = OperatorFuture<options::ListOptions, Vec<Entry>, F>;

impl<F: Future<Output = Result<Vec<Entry>>>> FutureList<F> {
    /// The limit passed to underlying service to specify the max results
    /// that could return per-request.
    ///
    /// Users could use this to control the memory usage of list operation.
    pub fn limit(mut self, v: usize) -> Self {
        self.args.limit = Some(v);
        self
    }

    /// The start_after passes to underlying service to specify the specified key
    /// to start listing from.
    pub fn start_after(mut self, v: &str) -> Self {
        self.args.start_after = Some(v.to_string());
        self
    }

    /// The recursive is used to control whether the list operation is recursive.
    ///
    /// - If `false`, list operation will only list the entries under the given path.
    /// - If `true`, list operation will list all entries that starts with given path.
    ///
    /// Default to `false`.
    pub fn recursive(mut self, v: bool) -> Self {
        self.args.recursive = v;
        self
    }

    /// Controls whether the `list` operation should return file versions.
    ///
    /// This function allows you to specify if the `list` operation, when executed, should include
    /// information about different versions of files, if versioning is supported and enabled.
    ///
    /// If `true`, subsequent `list` operations will include version information for each file.
    /// If `false`, version information will be omitted from the `list` results.
    ///
    /// Default to `false`
    pub fn versions(mut self, v: bool) -> Self {
        self.args.versions = v;
        self
    }

    /// Controls whether the `list` operation should include deleted files (or versions).
    ///
    /// This function allows you to specify if the `list` operation, when executed, should include
    /// entries for files or versions that have been marked as deleted. This is particularly relevant
    /// in object storage systems that support soft deletion or versioning.
    ///
    /// If `true`, subsequent `list` operations will include deleted files or versions.
    /// If `false`, deleted files or versions will be excluded from the `list` results.
    pub fn deleted(mut self, v: bool) -> Self {
        self.args.deleted = v;
        self
    }
}

/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureLister<F> = OperatorFuture<options::ListOptions, Lister, F>;

impl<F: Future<Output = Result<Lister>>> FutureLister<F> {
    /// The limit passed to underlying service to specify the max results
    /// that could return per-request.
    ///
    /// Users could use this to control the memory usage of list operation.
    pub fn limit(mut self, v: usize) -> Self {
        self.args.limit = Some(v);
        self
    }

    /// The start_after passes to underlying service to specify the specified key
    /// to start listing from.
    pub fn start_after(mut self, v: &str) -> Self {
        self.args.start_after = Some(v.to_string());
        self
    }

    /// The recursive is used to control whether the list operation is recursive.
    ///
    /// - If `false`, list operation will only list the entries under the given path.
    /// - If `true`, list operation will list all entries that starts with given path.
    ///
    /// Default to `false`.
    pub fn recursive(mut self, v: bool) -> Self {
        self.args.recursive = v;
        self
    }

    /// Controls whether the `list` operation should return file versions.
    ///
    /// This function allows you to specify if the `list` operation, when executed, should include
    /// information about different versions of files, if versioning is supported and enabled.
    ///
    /// If `true`, subsequent `list` operations will include version information for each file.
    /// If `false`, version information will be omitted from the `list` results.
    ///
    /// Default to `false`
    pub fn versions(mut self, v: bool) -> Self {
        self.args.versions = v;
        self
    }

    /// Controls whether the `list` operation should include deleted files (or versions).
    ///
    /// This function allows you to specify if the `list` operation, when executed, should include
    /// entries for files or versions that have been marked as deleted. This is particularly relevant
    /// in object storage systems that support soft deletion or versioning.
    ///
    /// If `true`, subsequent `list` operations will include deleted files or versions.
    /// If `false`, deleted files or versions will be excluded from the `list` results.
    pub fn deleted(mut self, v: bool) -> Self {
        self.args.deleted = v;
        self
    }
}

/// Future that generated by [`Operator::copy_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureCopy<F> = OperatorFuture<(options::CopyOptions, String), (), F>;

impl<F: Future<Output = Result<()>>> FutureCopy<F> {
    /// Sets the condition that copy operation will succeed only if target does not exist.
    ///
    /// Refer to [`options::CopyOptions::if_not_exists`] for more details.
    ///
    /// ### Example
    ///
    /// ```
    /// # use opendal::Result;
    /// # use opendal::Operator;
    ///
    /// # async fn test(op: Operator) -> Result<()> {
    /// let _ = op
    ///     .copy_with("source/path", "target/path")
    ///     .if_not_exists(true)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn if_not_exists(mut self, v: bool) -> Self {
        self.args.0.if_not_exists = v;
        self
    }
}
